package io.sundial.scheduler.impl;

import io.sundial.balance.Balancer;
import io.sundial.balance.Balancing;
import io.sundial.balance.exception.BalancingException;
import io.sundial.core.context.Context;
import io.sundial.core.lifecycle.Lifecycle;
import io.sundial.core.lifecycle.exception.DestroyingException;
import io.sundial.core.lifecycle.exception.InitializingException;
import io.sundial.executor.Executor;
import io.sundial.executor.impl.ListeningExecutor;
import io.sundial.job.JobSharding;
import io.sundial.scheduler.Scheduler;
import io.sundial.task.Task;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * 抽象的作业均衡调度器
 *
 * @author Payne 646742615@qq.com
 * 2018/12/21 13:13
 */
public abstract class BalancingScheduler extends ListeningExecutor implements Scheduler, Balancing {
    private Balancer defaultBalancer;
    private ConcurrentMap<String, Balancer> supportBalancers;

    @Override
    protected void initializing(Context context) throws InitializingException {
        super.initializing(context);

        // 如果没有设置就从上下文中找到所有的。
        if (supportBalancers == null || supportBalancers.isEmpty()) {
            supportBalancers = new ConcurrentHashMap<>();
            Map<String, Balancer> map = context.fetch(Balancer.class);
            Collection<Balancer> collection = map.values();
            for (Balancer balancer : collection) {
                String algorithm = balancer.getAlgorithm();
                if (algorithm == null || algorithm.trim().isEmpty()) {
                    throw new InitializingException("balancer " + balancer.getClass() + " must declare an not null or blank algorithm");
                }
                supportBalancers.put(algorithm, balancer);
            }
        }

        // 如果没配置缺省均衡器，就选择第一个作为默认的均衡器。
        if (defaultBalancer == null) {
            if (supportBalancers.isEmpty()) {
                throw new InitializingException("please config at least one balancer");
            } else {
                defaultBalancer = supportBalancers.values().iterator().next();
            }
        }

        for (Balancer supportBalancer : supportBalancers.values()) {
            if (supportBalancer instanceof Lifecycle) {
                ((Lifecycle) supportBalancer).initialize(context);
            }
        }
        if (defaultBalancer instanceof Lifecycle) {
            ((Lifecycle) defaultBalancer).initialize(context);
        }
    }

    @Override
    protected void destroying() throws DestroyingException {
        super.destroying();

        for (Balancer supportBalancer : supportBalancers.values()) {
            if (supportBalancer instanceof Lifecycle) {
                ((Lifecycle) supportBalancer).destroy();
            }
        }
        if (defaultBalancer instanceof Lifecycle) {
            ((Lifecycle) defaultBalancer).destroy();
        }

        supportBalancers.clear();
        supportBalancers = null;

        defaultBalancer = null;
    }

    protected Map<String, List<JobSharding>> balance(Map<String, Executor.Information> executors, Task task) throws BalancingException {
        String expect = task.getBalanceAlgorithm();
        Balancer chosen = null;
        for (Balancer balancer : supportBalancers.values()) {
            String actual = balancer.getAlgorithm();
            if (Objects.equals(expect, actual)) {
                chosen = balancer;
                break;
            }
        }
        if (chosen == null && !supportBalancers.values().isEmpty()) {
            chosen = supportBalancers.values().iterator().next();
        }
        if (chosen == null) {
            throw new IllegalStateException("no available balancer");
        }
        return chosen.balance(executors, task);
    }

    @Override
    public boolean mount(Balancer balancer) {
        return supportBalancers.putIfAbsent(balancer.getAlgorithm(), balancer) == null;
    }

    @Override
    public boolean unmount(Balancer balancer) {
        return supportBalancers.remove(balancer.getAlgorithm(), balancer);
    }

    public Balancer getDefaultBalancer() {
        return defaultBalancer;
    }

    public void setDefaultBalancer(Balancer defaultBalancer) {
        this.defaultBalancer = defaultBalancer;
    }

    public ConcurrentMap<String, Balancer> getSupportBalancers() {
        return supportBalancers;
    }

    public void setSupportBalancers(ConcurrentMap<String, Balancer> supportBalancers) {
        this.supportBalancers = supportBalancers;
    }
}
